package com.fleapx.share.rabbit.tx;

import com.fleapx.share.rabbit.Utils;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;

/**
 * RabbitMQ事务示例
 *
 * @author zengchao
 * @date 2021-05-13 09:04:04
 */
@Slf4j
public class TxDemo {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = Utils.getFactory();
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //创建Exchange
        channel.exchangeDeclare("tx.exchange", BuiltinExchangeType.DIRECT, true, false, new HashMap<>());
        //创建Queue
        channel.queueDeclare("tx.queue", true, false, false, new HashMap<>());
        //绑定路由
        channel.queueBind("tx.queue", "tx.exchange", "tx");

        //channel开启事务
        channel.txSelect();
        //发送3条消息
        String msgTemplate = "测试事务消息内容[%d]";
        channel.basicPublish("tx.exchange", "tx", new AMQP.BasicProperties(), String.format(msgTemplate,1).getBytes(StandardCharsets.UTF_8));
        channel.basicPublish("tx.exchange", "tx", new AMQP.BasicProperties(), String.format(msgTemplate,2).getBytes(StandardCharsets.UTF_8));
        channel.basicPublish("tx.exchange", "tx", new AMQP.BasicProperties(), String.format(msgTemplate,3).getBytes(StandardCharsets.UTF_8));
        //消息回滚
        channel.txRollback();
        //成功提交
        channel.basicPublish("tx.exchange", "tx", new AMQP.BasicProperties(), String.format(msgTemplate,4).getBytes(StandardCharsets.UTF_8));
        channel.txCommit();

        channel.close();
        connection.close();

    }
}
